package cn.tnar.yunpark.huasai.nbiot;

import cn.tnar.yunpark.huasai.HuasaiUtil;
import cn.tnar.yunpark.io.FlowableUdpServer;
import cn.tnar.yunpark.io.RetryForeverWithDelay;
import cn.tnar.yunpark.model.SpaceSensorStatus;
import cn.tnar.yunpark.model.SpaceStatus;
import cn.tnar.yunpark.service.SensorEventDispatcher;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

/**
 * Created by tieh on 2017/6/21.
 */
@Component
public class HuasaiNbiotProcessor {

    private static final Logger log = LoggerFactory.getLogger(HuasaiNbiotProcessor.class);

    @Value("${tnar.localIp:#{null}}")
    String localIp;

    @Value("${tnar.huasai.nbiot.port}")
    int localPort;

    @Autowired
    SensorEventDispatcher dispatcher;

    public void start() {
        Scheduler scheduler = Schedulers.single();
        FlowableUdpServer.create(localIp, localPort)
                .doOnNext((data) -> {
                    try {
                        DatagramSocket socket = data.getSocket();
                        DatagramPacket p = data.getPacket();

                        HuasaiNbiotMessage msg = HuasaiNbiotMessage.decode(p.getData(), p.getLength());
                        log.info("收到" + msg);
                        if (msg instanceof HuasaiNbiotTrafficMessage) {
                            HuasaiNbiotTrafficMessage tMsg = (HuasaiNbiotTrafficMessage)msg;
                            if(tMsg.getStatus() == 0 || tMsg.getStatus() == 1) {
                                dispatcher.dispatch(new SpaceStatus(tMsg.getSensorId(), HuasaiUtil.toLbmStatus(tMsg.getStatus()), tMsg.getTime()));
                                dispatcher.dispatch(new SpaceSensorStatus(tMsg.getSensorId(), tMsg.getPower(), tMsg.getPowerShow(), tMsg.getRssi(), tMsg.getRssiShow()));
                                sendAck(socket, p.getSocketAddress(), ((HuasaiNbiotTrafficMessage) msg).getBatchNo());
                            }
                        } else if (msg instanceof HuasaiNbiotKeepAliveMessage) {
                            HuasaiNbiotKeepAliveMessage k = (HuasaiNbiotKeepAliveMessage)msg;
                            dispatcher.dispatch(new SpaceSensorStatus(k.getSensorId(), k.getPower(), k.getPowerShow(), k.getRssi(), k.getRssiShow()));
                            sendAck(socket, p.getSocketAddress(), 0);
                        }
                    } catch (HuasaiException e) {
                        // 忽略收到的包解析、处理失败时，继续收
                    } catch (Exception e) {
                        // 打印其他处理异常，然后继续收
                        log.info("处理地磁消息出错", e);
                    }
                })
                .doOnError(e -> log.error(e.getMessage()))
                .retryWhen(new RetryForeverWithDelay(15, TimeUnit.SECONDS, scheduler))
                .subscribeOn(scheduler)
                .subscribe();
    }

    private void sendAck(DatagramSocket socket, SocketAddress addr, int batchNo) {
        try {
            byte[] ackBuf = new HuasaiNbiotAck(batchNo).encode();
            DatagramPacket ack = new DatagramPacket(ackBuf, 0, ackBuf.length, addr);
            socket.send(ack);
        } catch (Exception e) {
            log.warn("发送ACK失败: " + e.getMessage());
        }
    }
}
